- 瓶先生
- RabbitMQ中文文档
- RabbitMQ简介
- 安装
- AMQP协议
- 应用教程
- Published with GitBook
主题交换机
原文:Topics
状态:待校对
翻译:Bingjian-Zhu
校对:
为什么需要topic交换机?
(使用Go客户端)
上一篇教程,我们改进了我们的日志系统。我们使用direct
交换机替代了fanout
交换机,从只能盲目的广播消息改进为有可能选择性的接收日志。
尽管direct
交换机能够改善我们的系统,但是它也有它的限制 —— 没办法基于多个标准执行路由操作。
在我们的日志系统中,我们不只希望订阅基于严重程度的日志,同时还希望订阅基于发送来源的日志。Unix工具syslog就是同时基于严重程度-severity (info/warn/crit…) 和 设备-facility (auth/cron/kern…)来路由日志的。
如果这样的话,将会给予我们非常大的灵活性,我们既可以监听来源于“cron”的严重程度为“critical errors”的日志,也可以监听来源于“kern”的所有日志。
为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换机 —— topic交换机。
topic交换机
发送到topic
交换机的消息不可以携带随意routing_key
,它的routing_key必须是一个由.
分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。词语的个数可以随意,但是不要超过255字节。
binding key也必须拥有同样的格式。topic
交换机背后的逻辑跟direct
交换机很相似 —— 一个携带着特定routing_key的消息会被topic交换机投递给绑定键与之想匹配的队列。但是它的binding key和routing_key有两个特殊应用方式:
*
(星号) 用来表示一个单词.#
(井号) 用来表示任意数量(零个或多个)单词。
下边用图说明:
这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.
分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的:
。
我们创建了三个绑定:Q1的绑定键为 *.orange.*
,Q2的绑定键为 *.*.rabbit
和 lazy.#
。
这三个绑定键被可以总结为:
- Q1 对所有的桔黄色动物都感兴趣。
- Q2 则是对所有的兔子和所有懒惰的动物感兴趣。
一个携带有 quick.orange.rabbit
的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant
的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox
的消息会投递给第一个队列,携带有 lazy.brown.fox
的消息会投递给第二个队列。携带有 lazy.pink.rabbit
的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox
的消息不会投递给任何一个队列。
如果我们违反约定,发送了一个携带有一个单词或者四个单词("orange"
or "quick.orange.male.rabbit"
)的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。
但是另一方面,即使 "lazy.orange.male.rabbit"
有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。
Topic交换机
Topic交换机是很强大的,它可以表现出跟其他交换机类似的行为 当一个队列的binding key为 "#"(井号) 的时候,这个队列将会无视消息的routing key,接收所有的消息。 当
*
(星号) 和#
(井号) 这两个特殊字符都未在binding key中出现的时候,此时Topic交换机就拥有的direct交换机的行为。
代码整合
接下来我们会将Topic交换机应用到我们的日志系统中。在开始工作前,我们假设日志的routing key由两个单词组成,routing key看起来是这样的:
代码跟上一篇教程差不多。
emit_log_topic.go的代码:
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_topic", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "anonymous.info"
} else {
s = os.Args[1]
}
return s
}
receive_logs_topic.go的代码:
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [binding_key]...", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_topic", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_topic", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
执行下边命令 接收所有日志:
go run receive_logs_topic.go "#"
执行下边命令 接收来自”kern“设备的日志:
go run receive_logs_topic.go "kern.*"
执行下边命令 只接收严重程度为”critical“的日志:
go run receive_logs_topic.go "*.critical"
执行下边命令 建立多个绑定:
go run receive_logs_topic.go "kern.*" "*.critical"
执行下边命令 发送路由键为 "kern.critical" 的日志:
go run emit_log_topic.go "kern.critical" "A critical kernel error"
执行上边命令试试看效果吧。另外,上边代码不会对路由键和绑定键做任何假设,所以你可以在命令中使用超过两个路由键参数。
如果你现在还没被搞晕,想想下边问题:
- 绑定键为
*
的队列会取到一个routing key为空的消息吗? - 绑定键为
#.*
的队列会获取到一个名为..
的路由键的消息吗?它会取到一个routing key为单个单词的消息吗? a.*.#
和a.#
的区别在哪儿?
(完整代码参见emit_logs_topic.go and receive_logs_topic.go)